import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
//import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types, ValidationException}


object StreamingJob_connector_jdbc {
        def main(args: Array[String])
        {
        val SourceCsvPath ="/home/appleyuchi/桌面/source.csv"
        val CkJdbcUrl ="jdbc:clickhouse://Desktop:8123/default"
        val CkUsername = "default"
        val CkPassword = "appleyuchi"
        val BatchSize = 1 // 设置您的batch size

        val env = StreamExecutionEnvironment.getExecutionEnvironment

        val tEnv = StreamTableEnvironment.create(env)

        val csvTableSource = CsvTableSource
        .builder()
        .path(SourceCsvPath)
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("name", Types.STRING)
        .field("age", Types.LONG)
        .field("sex", Types.STRING)
        .field("grade", Types.LONG)
        .field("rate", Types.FLOAT)
        .build()

        tEnv.registerTableSource("source", csvTableSource)

        val resultTable = tEnv.scan("source").select("name, grade, rate")

        val insertIntoCkSql =
        """
        |  INSERT INTO sink_table (
        |    name, grade, rate
        |  ) VALUES (
        |    ?, ?, ?
        |  )
        """.stripMargin

//将数据写入 ClickHouse Sink
        val sink = JDBCAppendTableSink
        .builder()
        .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
        .setDBUrl(CkJdbcUrl)
        .setUsername(CkUsername)
        .setPassword(CkPassword)
        .setQuery(insertIntoCkSql)
        .setBatchSize(BatchSize)
        .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
        .build()

        tEnv.registerTableSink(
        "sink",
        Array("name", "grade", "rate"),
        Array(Types.STRING, Types.LONG, Types.FLOAT),
        sink
        )

        tEnv.insertInto(resultTable, "sink")

        env.execute("Flink Table API to ClickHouse Example")
        }
        }